package com.fintech.pangu.elasticjob.core;

import com.dangdang.ddframe.job.api.ElasticJob;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.dangdang.ddframe.job.api.script.ScriptJob;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.JobTypeConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.config.script.ScriptJobConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import com.fintech.pangu.elasticjob.annotation.ElasticScheduledJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.Environment;
import org.springframework.util.Assert;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author xujunqi
 * @since 1.0.0
 */
public class ElasticScheduledJobConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
    private static final Logger logger = LoggerFactory.getLogger(ElasticScheduledJobConfiguration.class);

    private ConfigurableApplicationContext applicationContext;

    private Environment environment;

    private ZookeeperRegistryCenter regCenter;

    private JobEventConfiguration jobEventConfiguration;

    public ElasticScheduledJobConfiguration(ZookeeperRegistryCenter regCenter) {
        this.regCenter = regCenter;
    }

    public void setRegCenter(ZookeeperRegistryCenter regCenter) {
        this.regCenter = regCenter;
    }

    public void setJobEventConfiguration(JobEventConfiguration jobEventConfiguration) {
        this.jobEventConfiguration = jobEventConfiguration;
    }

    /**
     * 生成container beanName时的计数器
     */
    private AtomicLong counter = new AtomicLong(0);

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
        this.environment = applicationContext.getEnvironment();
    }

    /**
     * 实现该接口后，当所有单例 bean 都初始化完成以后， 容器会回调该接口的方法 afterSingletonsInstantiated
     * 主要应用场合就是在所有单例 bean 创建完成之后，可以在该回调中做一些事情
     */
    @Override
    public void afterSingletonsInstantiated() {
        init();
    }

    private void init() {
        //获取所有使用了@ElasticJob注解的spring容器中的bean
        Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticScheduledJob.class);

        logger.info("开始扫描加载所有elastic-job任务实例, 总个数: {} 个", beans.size());
        //迭代并调用registerContainer()，向spring中注册RocketMQListenerContainer
        if (Objects.nonNull(beans)) {
            beans.forEach(this::registerContainer);
        }

        logger.info("完成扫描加载所有elastic-job任务实例, 总个数: {} 个, 成功加载: {} 个", beans.size(), counter);
    }

    /**
     * 使用@ElasticJob的bean，向spring容器中注册RocketMQListenerContainer
     *
     * @param beanName 实例名称
     * @param bean     实例对象
     */
    private void registerContainer(String beanName, Object bean) {
        //获取bean的Class
        Class<?> clazz = AopUtils.getTargetClass(bean);

        //判断clazz是不是RocketMQListener接口类型的
        if (!ElasticJob.class.isAssignableFrom(clazz)) {
            throw new IllegalStateException(clazz + " 不是实例 " + ElasticJob.class.getName());
        }

        ElasticJob elasticJob = (ElasticJob) bean;
        ElasticScheduledJob annotation = clazz.getAnnotation(ElasticScheduledJob.class);
        if (annotation == null) {
            return;
        }

        logger.info("开始执行定时任务[jobClass={}]调度配置", clazz.getCanonicalName());

        try {

            final String cron = environment.resolvePlaceholders(annotation.cron());
            Assert.notNull(cron, "表达式不能为空");

            final Integer shardingTotalCount = Integer.parseInt(environment.resolvePlaceholders(annotation.shardingTotalCount()));
            final String shardingItemParameters = environment.resolvePlaceholders(annotation.shardingItemParameters());
            final String jobParameter = environment.resolvePlaceholders(annotation.jobParameter());
            final boolean failover = annotation.failover();
            final boolean misfire = annotation.misfire();
            final String description = environment.resolvePlaceholders(annotation.description());
            final boolean overwrite = annotation.overwrite();
            final boolean disabled = annotation.disabled();

            Class jobClass = elasticJob.getClass();

            JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount)
                    .shardingItemParameters(shardingItemParameters)
                    .jobParameter(jobParameter)
                    .failover(failover)
                    .misfire(misfire)
                    .description(description)
                    .build();

            JobTypeConfiguration jobTypeConfiguration = null;
            if (elasticJob instanceof SimpleJob) {
                jobTypeConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());
            } else if (elasticJob instanceof DataflowJob) {
                jobTypeConfiguration = new DataflowJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName(), true);
            } else if (elasticJob instanceof ScriptJob) {
                jobTypeConfiguration = new ScriptJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName());
            }

            LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(jobTypeConfiguration).overwrite(overwrite).disabled(disabled).build();

            SpringJobScheduler springJobScheduler;

            // 是否加载了任务事件记录配置
            if (jobEventConfiguration == null) {
                springJobScheduler = new SpringJobScheduler(elasticJob, regCenter, liteJobConfiguration);
            } else {
                springJobScheduler = new SpringJobScheduler(elasticJob, regCenter, liteJobConfiguration, jobEventConfiguration);
            }

            JobScheduler jobScheduler = springJobScheduler;
            jobScheduler.init();
        } catch (Exception e) {
            logger.error("定时任务[beanName={}, jobClass={}]调度配置失败", beanName, clazz.getCanonicalName(), e);
        }

        counter.incrementAndGet();
        if (annotation.disabled()) {
            logger.warn("定时任务[beanName={}, jobClass={}]调度配置成功, 但作业未启用", beanName, clazz.getCanonicalName());
        } else {
            logger.info("定时任务[beanName={}, jobClass={}]调度配置成功", beanName, clazz.getCanonicalName());
        }
    }
}